package com.rabbitmq.origin.api;

import com.rabbitmq.client.*;
import com.rabbitmq.common.utils.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

/**
 * @author
 * @Describe 直连消息队列进行消息消费，一个队列-N个消费者。
 * 1.默认情况下，rabbitmq按顺序将消息发送给每一个消费者，每个消费者得到的消息数量是平均数，该方式是轮询发送
 * 2.可通过消息确认方式来处理各个消费者处理速度不同或者有消费者发生异常导致消息异常处理，实现能者多劳或者可进行异常处理
 * @date
 */
@Slf4j(topic = "RabbitMqConsumerQueue")
public class RabbitMqConsumerQueue {

    /**
     * 直连消息队列进行消费,rabbitmq轮询发送消息,自动确认消息消费
     *
     * @param queueName
     */
    public void consumerQueue(String queueName, int flag) {
        Connection conn = null;
        Channel channel = null;
        try {
            // 获取连接
            conn = RabbitMqUtil.getConn();
            // 创建通道（通道可用于消息的发送和接收）
            channel = conn.createChannel();

            // 声明队列，已存在的不会创建(消费指定队列的消息时声明队列要和生产者声明的队列属性一致)
            channel.queueDeclare(queueName, true, false, false, null);

            // 创建队列消费者
            String threadName = Thread.currentThread().getName();
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    if (flag % 3 == 1) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }
                    log.info(threadName + " consumer-" + flag + " handleDelivery " + envelope.getRoutingKey() + ":'" + message + "'");
                }
            };

            // 消息消费,自动确认模式为true
            channel.basicConsume(queueName, true, "queueConsumer", consumer);
        } catch (Exception e) {
            log.error("消费消息异常,", e);
            // 资源关闭
            RabbitMqUtil.close(conn, channel);
        }
    }

    /**
     * 直连消息队列进行消费,使用手工确认机制处理
     *
     * @param queueName
     */
    public void consumerQueueManualConfirmed(String queueName, int flag) {
        Connection conn = null;
        final Channel channel;
        try {
            // 获取连接
            conn = RabbitMqUtil.getConn();
            // 创建通道（通道可用于消息的发送和接收）
            channel = conn.createChannel();

            // 设置服务器将传递的最大消息数为1，此时只能消费一个消息，如果没有限制则为 0
            channel.basicQos(1);

            // 声明队列，已存在的不会创建(消费指定队列的消息时声明队列要和生产者声明的队列属性一致)
            channel.queueDeclare(queueName, true, false, false, null);

            // 创建队列消费者
            String threadName = Thread.currentThread().getName();
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    if (flag == 1) {
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }

                    if (flag == 2) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }

                    log.info(threadName + " consumer-" + flag + " handleDelivery " + envelope.getRoutingKey() + ":'" + message + "'");

                    // 手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    // log.info(threadName + "手工确认完毕");
                }
            };

            // 消息消费，自动确认消息设置为false
            channel.basicConsume(queueName, false, "queueManualConfirmConsumer", consumer);
        } catch (Exception e) {
            log.error("消费消息异常,", e);
            // 资源关闭
            RabbitMqUtil.close(conn, null);
        }
    }

    /**
     * 获取单条消息进行消费
     *
     * @param queueName
     */
    public void consumerSingle(String queueName) {
        Connection conn = null;
        final Channel channel;
        try {
            // 获取连接
            conn = RabbitMqUtil.getConn();
            // 创建通道（通道可用于消息的发送和接收）
            channel = conn.createChannel();

            // 声明队列，已存在的不会创建(消费指定队列的消息时声明队列要和生产者声明的队列属性一致)
            channel.queueDeclare(queueName, true, false, false, null);

            // 单条消息获取(是否自动确认)
            // GetResponse getResponse = channel.basicGet(queueName, true);
            GetResponse getResponse = channel.basicGet(queueName, false);
            if (null == getResponse) {
                log.info("获取单条消息为空");
            } else {
                // 获取消息体
                String msg = new String(getResponse.getBody(), "UTF-8");
                // 获取BasicProperties
                AMQP.BasicProperties props = getResponse.getProps();
                // 获取消息id
                long deliveryTag = getResponse.getEnvelope().getDeliveryTag();
                log.info(deliveryTag + ":" + msg + ":" + props.getHeaders() + ":" + getResponse.getMessageCount());
                // 手动确认消费
                channel.basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            log.error("消费消息异常,", e);
            // 资源关闭
            RabbitMqUtil.close(conn, null);
        }
    }
}
